package sample.persistence.res

import java.io.File
import java.util.concurrent.CountDownLatch

import org.apache.pekko.actor.typed.ActorSystem
import org.apache.pekko.actor.typed.scaladsl.Behaviors
import org.apache.pekko.cluster.sharding.typed.{ ReplicatedSharding, ReplicatedShardingExtension }
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.management.scaladsl.PekkoManagement
import org.apache.pekko.persistence.cassandra.testkit.CassandraLauncher
import org.apache.pekko.persistence.typed.ReplicaId
import com.typesafe.config.{ Config, ConfigFactory }
import sample.persistence.res.bank.BankAccount
import sample.persistence.res.counter.{ ThumbsUpCounter, ThumbsUpHttp }

import scala.concurrent.ExecutionContext
import scala.util.{ Failure, Success }

object MainApp {

  val AllReplicas = Set(ReplicaId("eu-west"), ReplicaId("eu-central"))

  def main(args: Array[String]): Unit = {
    args.headOption match {

      case None =>
        startClusterInSameJvm()

      case Some(portString) if portString.matches("""\d+""") =>
        val port = portString.toInt
        val dc = args.tail.headOption.getOrElse("eu-west")
        startNode(port, dc)

      case Some("cassandra") =>
        startCassandraDatabase()
        println("Started Apache Cassandra, press Ctrl + C to kill")
        new CountDownLatch(1).await()

    }
  }

  def startClusterInSameJvm(): Unit = {
    startCassandraDatabase()

    startNode(7345, "eu-west")
    startNode(7355, "eu-central")
  }

  def startNode(port: Int, dc: String): Unit = {
    implicit val system: ActorSystem[Nothing] =
      ActorSystem[Nothing](Behaviors.empty[Nothing], "ClusterSystem", config(port, dc))
    implicit val ec: ExecutionContext = system.executionContext

    val replicatedSharding = ReplicatedShardingExtension(system)
    val thumbsUpReplicatedSharding: ReplicatedSharding[ThumbsUpCounter.Command] =
      replicatedSharding.init(ThumbsUpCounter.Provider)

    // no HTTP end points for them, just showing that multiple replicated sharding instances can be started
    val bankAccountReplicatedSharding: ReplicatedSharding[BankAccount.Command] =
      replicatedSharding.init(BankAccount.Provider)

    if (port != 0) {
      val httpHost = "0.0.0.0"
      val httpPort = 20000 + port
      Http().newServerAt(httpHost, httpPort)
        .bind(ThumbsUpHttp.route(ReplicaId(dc), thumbsUpReplicatedSharding))
        .onComplete {
          case Success(_)  => system.log.info("HTTP Server bound to http://{}:{}", httpHost, httpPort)
          case Failure(ex) => system.log.error(s"Failed to bind HTTP Server to http://$httpHost:$httpPort", ex)
        }
      PekkoManagement(system).start()
    }

  }

  def config(port: Int, dc: String): Config =
    ConfigFactory.parseString(
      s"""
      pekko.remote.artery.canonical.port = $port
      pekko.management.http.port = 1$port
      pekko.cluster.multi-data-center.self-data-center = $dc
    """).withFallback(ConfigFactory.load("application.conf"))

  /**
   * To make the sample easier to run we kickstart an Apache Cassandra instance to
   * act as the journal. Apache Cassandra is a great choice of backend for Apache Pekko Persistence but
   * in a real application a pre-existing Cassandra cluster should be used.
   */
  def startCassandraDatabase(): Unit = {
    val databaseDirectory = new File("target/cassandra-db")
    CassandraLauncher.start(
      databaseDirectory,
      CassandraLauncher.DefaultTestConfigResource,
      clean = false,
      port = 9042)
  }

}
